package sparkSinkToHdfs;

import kafka.serializer.StringDecoder;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * 基于Kafka Direct方式的实时wordcount程序
 * @author Administrator
 *
 */
public class KafkaDirecReadSinkToHdfs {

	public static void main(String[] args) throws InterruptedException {
		SparkConf conf = new SparkConf()
				.setMaster("local[*]")
				.setAppName("KafkaDirectWordCount");  
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
		
		// 首先，要创建一份kafka参数map
		Map<String, String> kafkaParams = new HashMap<String, String>();
		String brokerList = "10.112.80.6:9092";
		kafkaParams.put("metadata.broker.list", brokerList);
		
		// 然后，要创建一个set，里面放入，你要读取的topic
		// 这个，就是我们所说的，它自己给你做的很好，可以并行读取多个topic
		Set<String> topics = new HashSet<String>();
		topics.add("kafka_flume_hdfs1");
		
		// 创建输入DStream
		JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
				jssc, 
				String.class, 
				String.class, 
				StringDecoder.class, 
				StringDecoder.class, 
				kafkaParams, 
				topics);

        lines.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
            @Override
            public void call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
                stringStringJavaPairRDD.saveAsHadoopFile("hdfs://cdhmanager:8020/flumedata/spark",Text.class,Text.class, RDDMultipleTextOutputFormat.class);
            }
        });
		jssc.start();
		jssc.awaitTermination();
		jssc.close();
	}
	
}
